-
Notifications
You must be signed in to change notification settings - Fork 1.9k
[ENH] Put both token id and token str in the statistics #5777
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
|
Add string label alongside index for Adds optional string labels to sparse-vector statistics so that each token is emitted in both numeric ( Key Changes• Changed Affected Areas• This summary was automatically generated by @propel-code-bot |
This comment has been minimized.
This comment has been minimized.
a2ec476 to
a0b964a
Compare
tanujnay112
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming should be improved
beb0dbc to
1279718
Compare
rust/types/src/metadata.rs
Outdated
| @@ -156,7 +113,7 @@ impl SparseVector { | |||
| } | |||
| } | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The implementation of from_pairs can be simplified by using unzip, which is more idiomatic and likely more efficient. I've also renamed the parameter from triples to pairs for clarity.
| pub fn from_pairs(pairs: impl IntoIterator<Item = (u32, f32)>) -> Self { | |
| let (indices, values) = pairs.into_iter().unzip(); | |
| Self { | |
| indices, | |
| values, | |
| tokens: None, | |
| } | |
| } |
Context for Agents
The implementation of `from_pairs` can be simplified by using `unzip`, which is more idiomatic and likely more efficient. I've also renamed the parameter from `triples` to `pairs` for clarity.
```suggestion
pub fn from_pairs(pairs: impl IntoIterator<Item = (u32, f32)>) -> Self {
let (indices, values) = pairs.into_iter().unzip();
Self {
indices,
values,
tokens: None,
}
}
```
File: rust/types/src/metadata.rs
Line: 1147865879 to
ce0a380
Compare
| if let Some(stable_value_token) = stats_value.stable_value_token() { | ||
| metadata.insert( | ||
| "value_token".to_string(), | ||
| UpdateMetadataValue::Str(stable_value_token), | ||
| ); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[TestCoverage]
This new logic to add value_token is a great addition. However, it doesn't appear to be covered by tests. The existing tests for sparse vectors seem to only cover cases where tokens are not provided, exercising the None path for stable_value_token.
To ensure this new functionality is robust, could you please add a test case with a SparseVector that includes tokens? This test should assert that the value_token field is correctly populated in the resulting statistics records. This would likely require updating test helpers like extract_metadata_tuple as well.
Context for Agents
This new logic to add `value_token` is a great addition. However, it doesn't appear to be covered by tests. The existing tests for sparse vectors seem to only cover cases where tokens are not provided, exercising the `None` path for `stable_value_token`.
To ensure this new functionality is robust, could you please add a test case with a `SparseVector` that includes tokens? This test should assert that the `value_token` field is correctly populated in the resulting statistics records. This would likely require updating test helpers like `extract_metadata_tuple` as well.
File: rust/worker/src/execution/functions/statistics.rs
Line: 255ce0a380 to
36086d2
Compare
| // Wrap in Arc to avoid cloning large MaterializeLogOutput data | ||
| let log_fetch_records_clone = log_fetch_records.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Resource leak in concurrent compaction handling: The clone() operations at lines 603-604 create deep copies of log_fetch_records and entire CompactionContext for parallel execution. With large datasets (e.g., 10k records), this duplicates significant memory without cleanup guarantees if one future fails.
// Current: Clones entire state
let log_fetch_records_clone = log_fetch_records.clone();
let mut self_clone_fn = self.clone();
// Safer approach: Use Arc to share data
let log_fetch_records = Arc::new(log_fetch_records);
let fn_records = Arc::clone(&log_fetch_records);Context for Agents
**Resource leak in concurrent compaction handling**: The `clone()` operations at lines 603-604 create deep copies of `log_fetch_records` and entire `CompactionContext` for parallel execution. With large datasets (e.g., 10k records), this duplicates significant memory without cleanup guarantees if one future fails.
```rust
// Current: Clones entire state
let log_fetch_records_clone = log_fetch_records.clone();
let mut self_clone_fn = self.clone();
// Safer approach: Use Arc to share data
let log_fetch_records = Arc::new(log_fetch_records);
let fn_records = Arc::clone(&log_fetch_records);
```
File: rust/worker/src/execution/orchestration/compact.rs
Line: 604| .run_compaction(collection_id, system) | ||
| .await; | ||
| let result = Box::pin(compaction_context.run_compaction(collection_id, system)).await; | ||
| Box::pin(compaction_context.cleanup()).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Potential panic from unbounded recursion: Box::pin() at line 723 wraps an async function that can recursively call itself through orchestrator chains. With deep call stacks (e.g., multiple attached functions), this risks stack overflow despite the 8MB thread stack allocation in tests.
// Add depth tracking to prevent unbounded recursion
pub async fn compact(
// ... params
max_depth: Option<u32>,
) -> Result<CompactionResponse, CompactionError> {
if let Some(depth) = max_depth {
if depth == 0 {
return Err(CompactionError::InvariantViolation(
"Maximum orchestration depth exceeded".to_string()
));
}
}
// ... rest of function
}Context for Agents
**Potential panic from unbounded recursion**: `Box::pin()` at line 723 wraps an async function that can recursively call itself through orchestrator chains. With deep call stacks (e.g., multiple attached functions), this risks stack overflow despite the 8MB thread stack allocation in tests.
```rust
// Add depth tracking to prevent unbounded recursion
pub async fn compact(
// ... params
max_depth: Option<u32>,
) -> Result<CompactionResponse, CompactionError> {
if let Some(depth) = max_depth {
if depth == 0 {
return Err(CompactionError::InvariantViolation(
"Maximum orchestration depth exceeded".to_string()
));
}
}
// ... rest of function
}
```
File: rust/worker/src/execution/orchestration/compact.rs
Line: 723| for hnsw_index_uuid in self.hnsw_index_uuids { | ||
| let _ = HnswIndexProvider::purge_one_id( | ||
| self.hnsw_provider.temporary_storage_path.as_path(), | ||
| hnsw_index_uuid, | ||
| ) | ||
| .await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
Missing error propagation in cleanup path: cleanup() at line 666 uses let _ = to ignore HNSW purge errors. If cleanup fails during error handling (e.g., disk full), subsequent compactions will leak resources without visibility.
pub(crate) async fn cleanup(self) {
let mut cleanup_errors = Vec::new();
for hnsw_index_uuid in self.hnsw_index_uuids {
if let Err(e) = HnswIndexProvider::purge_one_id(
self.hnsw_provider.temporary_storage_path.as_path(),
hnsw_index_uuid,
).await {
cleanup_errors.push((hnsw_index_uuid, e));
}
}
if !cleanup_errors.is_empty() {
tracing::warn!("Cleanup failures: {:?}", cleanup_errors);
}
}Context for Agents
**Missing error propagation in cleanup path**: `cleanup()` at line 666 uses `let _ =` to ignore HNSW purge errors. If cleanup fails during error handling (e.g., disk full), subsequent compactions will leak resources without visibility.
```rust
pub(crate) async fn cleanup(self) {
let mut cleanup_errors = Vec::new();
for hnsw_index_uuid in self.hnsw_index_uuids {
if let Err(e) = HnswIndexProvider::purge_one_id(
self.hnsw_provider.temporary_storage_path.as_path(),
hnsw_index_uuid,
).await {
cleanup_errors.push((hnsw_index_uuid, e));
}
}
if !cleanup_errors.is_empty() {
tracing::warn!("Cleanup failures: {:?}", cleanup_errors);
}
}
```
File: rust/worker/src/execution/orchestration/compact.rs
Line: 671| (Self::SparseVector(lhs1, lhs2), Self::SparseVector(rhs1, rhs2)) => { | ||
| lhs1 == rhs1 && lhs2 == rhs2 | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
For consistency with the other match arms in this PartialEq implementation, you can remove the braces and use a single expression with a trailing comma.
Context for Agents
For consistency with the other match arms in this `PartialEq` implementation, you can remove the braces and use a single expression with a trailing comma.
File: rust/worker/src/execution/functions/statistics.rs
Line: 175| StatisticsValue::Float(value) => value.to_bits().hash(state), | ||
| StatisticsValue::Str(value) => value.hash(state), | ||
| StatisticsValue::SparseVector(value) => value.hash(state), | ||
| StatisticsValue::SparseVector(value, token) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
token -> label
Description of changes
We need to put both tokens in both numeric and string form into the
outputs for statistics functions.
Test plan
CI
Migration plan
N/A
Observability plan
N/A
Documentation Changes
N/A